{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "07de9dc3",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "ca5bbb06",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "WARNING: An illegal reflective access operation has occurred\n",
      "WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
      "WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
      "WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
      "WARNING: All illegal access operations will be denied in a future release\n",
      "22/02/16 21:11:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
     ]
    }
   ],
   "source": [
    "spark = SparkSession.builder \\\n",
    "    .master(\"local[*]\") \\\n",
    "    .appName('test') \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "cf8de204",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "--2022-02-16 21:13:50--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-01.csv\n",
      "Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.84.132\n",
      "Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.84.132|:443... connected.\n",
      "HTTP request sent, awaiting response... 200 OK\n",
      "Length: 752335705 (717M) [text/csv]\n",
      "Saving to: ‘fhvhv_tripdata_2021-01.csv’\n",
      "\n",
      "fhvhv_tripdata_2021 100%[===================>] 717.48M  35.6MB/s    in 21s     \n",
      "\n",
      "2022-02-16 21:14:11 (34.4 MB/s) - ‘fhvhv_tripdata_2021-01.csv’ saved [752335705/752335705]\n",
      "\n"
     ]
    }
   ],
   "source": [
    "!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "201a5957",
   "metadata": {},
   "outputs": [],
   "source": [
    "!gzip -dc fhvhv_tripdata_2021-01.csv.gz"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "2a52087c",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "11908469 fhvhv_tripdata_2021-01.csv\r\n"
     ]
    }
   ],
   "source": [
    "!wc -l fhvhv_tripdata_2021-01.csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "931021a7",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read \\\n",
    "    .option(\"header\", \"true\") \\\n",
    "    .csv('fhvhv_tripdata_2021-01.csv')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "d44b7839",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,StringType,true),StructField(DOLocationID,StringType,true),StructField(SR_Flag,StringType,true)))"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "4249e790",
   "metadata": {},
   "outputs": [],
   "source": [
    "!head -n 1001 fhvhv_tripdata_2021-01.csv > head.csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "6894312c",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "f3ca771b",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_pandas = pd.read_csv('head.csv')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "f1066b4f",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "hvfhs_license_num        object\n",
       "dispatching_base_num     object\n",
       "pickup_datetime          object\n",
       "dropoff_datetime         object\n",
       "PULocationID              int64\n",
       "DOLocationID              int64\n",
       "SR_Flag                 float64\n",
       "dtype: object"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_pandas.dtypes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "id": "f8413c9d",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(SR_Flag,DoubleType,true)))"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark.createDataFrame(df_pandas).schema"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "80f252c1",
   "metadata": {},
   "source": [
    "Integer - 4 bytes\n",
    "Long - 8 bytes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "16937bfd",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import types"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "fc61a99a",
   "metadata": {},
   "outputs": [],
   "source": [
    "schema = types.StructType([\n",
    "    types.StructField('hvfhs_license_num', types.StringType(), True),\n",
    "    types.StructField('dispatching_base_num', types.StringType(), True),\n",
    "    types.StructField('pickup_datetime', types.TimestampType(), True),\n",
    "    types.StructField('dropoff_datetime', types.TimestampType(), True),\n",
    "    types.StructField('PULocationID', types.IntegerType(), True),\n",
    "    types.StructField('DOLocationID', types.IntegerType(), True),\n",
    "    types.StructField('SR_Flag', types.StringType(), True)\n",
    "])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "id": "f94052ae",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read \\\n",
    "    .option(\"header\", \"true\") \\\n",
    "    .schema(schema) \\\n",
    "    .csv('fhvhv_tripdata_2021-01.csv')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "id": "c270d9d6",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = df.repartition(24)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7796c2b2",
   "metadata": {},
   "outputs": [],
   "source": [
    "df.write.parquet('fhvhv/2021/01/')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "id": "c3cab876",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read.parquet('fhvhv/2021/01/')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "id": "203b5627",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- hvfhs_license_num: string (nullable = true)\n",
      " |-- dispatching_base_num: string (nullable = true)\n",
      " |-- pickup_datetime: timestamp (nullable = true)\n",
      " |-- dropoff_datetime: timestamp (nullable = true)\n",
      " |-- PULocationID: integer (nullable = true)\n",
      " |-- DOLocationID: integer (nullable = true)\n",
      " |-- SR_Flag: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "64172a47",
   "metadata": {},
   "source": [
    "SELECT * FROM df WHERE hvfhs_license_num =  HV0003"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 56,
   "id": "d24840a0",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import functions as F"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 61,
   "id": "3ab1ca44",
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+\n",
      "|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|\n",
      "+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+\n",
      "|           HV0005|              B02510|2021-01-07 06:43:22|2021-01-07 06:55:06|         142|         230|   null|\n",
      "|           HV0005|              B02510|2021-01-01 16:01:26|2021-01-01 16:20:20|         133|          91|   null|\n",
      "|           HV0003|              B02764|2021-01-01 00:23:13|2021-01-01 00:30:35|         147|         159|   null|\n",
      "|           HV0003|              B02869|2021-01-06 11:43:12|2021-01-06 11:55:07|          79|         164|   null|\n",
      "|           HV0003|              B02884|2021-01-04 15:35:32|2021-01-04 15:52:02|         174|          18|   null|\n",
      "|           HV0003|              B02875|2021-01-04 13:42:15|2021-01-04 14:04:57|         201|         180|   null|\n",
      "|           HV0005|              B02510|2021-01-04 18:57:31|2021-01-04 19:09:55|         230|         142|   null|\n",
      "|           HV0003|              B02872|2021-01-03 18:42:03|2021-01-03 19:12:22|         132|          72|   null|\n",
      "|           HV0004|              B02800|2021-01-01 05:31:50|2021-01-01 05:40:03|         188|          61|   null|\n",
      "|           HV0005|              B02510|2021-01-04 20:21:47|2021-01-04 20:26:03|          97|         189|   null|\n",
      "|           HV0003|              B02764|2021-01-01 01:51:18|2021-01-01 02:05:32|         174|         235|   null|\n",
      "|           HV0003|              B02871|2021-01-05 10:20:54|2021-01-05 10:32:44|          35|          76|   null|\n",
      "|           HV0005|              B02510|2021-01-06 02:32:09|2021-01-06 02:43:35|          35|          39|   null|\n",
      "|           HV0003|              B02882|2021-01-04 12:34:52|2021-01-04 12:38:59|         231|          13|   null|\n",
      "|           HV0003|              B02617|2021-01-02 20:12:56|2021-01-02 20:41:18|          87|         127|   null|\n",
      "|           HV0005|              B02510|2021-01-02 16:55:48|2021-01-02 17:20:40|          17|          89|   null|\n",
      "|           HV0003|              B02869|2021-01-02 15:14:38|2021-01-02 15:23:27|          11|          14|   null|\n",
      "|           HV0005|              B02510|2021-01-01 05:54:50|2021-01-01 06:03:46|          21|          26|   null|\n",
      "|           HV0003|              B02869|2021-01-04 12:40:42|2021-01-04 12:48:34|          83|         260|   null|\n",
      "|           HV0005|              B02510|2021-01-01 14:58:57|2021-01-01 15:09:53|         189|          52|   null|\n",
      "+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 63,
   "id": "6d98c2ce",
   "metadata": {},
   "outputs": [],
   "source": [
    "def crazy_stuff(base_num):\n",
    "    num = int(base_num[1:])\n",
    "    if num % 7 == 0:\n",
    "        return f's/{num:03x}'\n",
    "    elif num % 3 == 0:\n",
    "        return f'a/{num:03x}'\n",
    "    else:\n",
    "        return f'e/{num:03x}'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 65,
   "id": "f3175419",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'s/b44'"
      ]
     },
     "execution_count": 65,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "crazy_stuff('B02884')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 66,
   "id": "9bb5d503",
   "metadata": {},
   "outputs": [],
   "source": [
    "crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 67,
   "id": "b38f0465",
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-----------+------------+------------+------------+\n",
      "|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|\n",
      "+-------+-----------+------------+------------+------------+\n",
      "|  e/9ce| 2021-01-07|  2021-01-07|         142|         230|\n",
      "|  e/9ce| 2021-01-01|  2021-01-01|         133|          91|\n",
      "|  e/acc| 2021-01-01|  2021-01-01|         147|         159|\n",
      "|  e/b35| 2021-01-06|  2021-01-06|          79|         164|\n",
      "|  s/b44| 2021-01-04|  2021-01-04|         174|          18|\n",
      "|  e/b3b| 2021-01-04|  2021-01-04|         201|         180|\n",
      "|  e/9ce| 2021-01-04|  2021-01-04|         230|         142|\n",
      "|  e/b38| 2021-01-03|  2021-01-03|         132|          72|\n",
      "|  s/af0| 2021-01-01|  2021-01-01|         188|          61|\n",
      "|  e/9ce| 2021-01-04|  2021-01-04|          97|         189|\n",
      "|  e/acc| 2021-01-01|  2021-01-01|         174|         235|\n",
      "|  a/b37| 2021-01-05|  2021-01-05|          35|          76|\n",
      "|  e/9ce| 2021-01-06|  2021-01-06|          35|          39|\n",
      "|  e/b42| 2021-01-04|  2021-01-04|         231|          13|\n",
      "|  e/a39| 2021-01-02|  2021-01-02|          87|         127|\n",
      "|  e/9ce| 2021-01-02|  2021-01-02|          17|          89|\n",
      "|  e/b35| 2021-01-02|  2021-01-02|          11|          14|\n",
      "|  e/9ce| 2021-01-01|  2021-01-01|          21|          26|\n",
      "|  e/b35| 2021-01-04|  2021-01-04|          83|         260|\n",
      "|  e/9ce| 2021-01-01|  2021-01-01|         189|          52|\n",
      "+-------+-----------+------------+------------+------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df \\\n",
    "    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \\\n",
    "    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \\\n",
    "    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \\\n",
    "    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \\\n",
    "    .show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 55,
   "id": "00921644",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(pickup_datetime=datetime.datetime(2021, 1, 1, 0, 23, 13), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 30, 35), PULocationID=147, DOLocationID=159),\n",
       " Row(pickup_datetime=datetime.datetime(2021, 1, 6, 11, 43, 12), dropoff_datetime=datetime.datetime(2021, 1, 6, 11, 55, 7), PULocationID=79, DOLocationID=164),\n",
       " Row(pickup_datetime=datetime.datetime(2021, 1, 4, 15, 35, 32), dropoff_datetime=datetime.datetime(2021, 1, 4, 15, 52, 2), PULocationID=174, DOLocationID=18),\n",
       " Row(pickup_datetime=datetime.datetime(2021, 1, 4, 13, 42, 15), dropoff_datetime=datetime.datetime(2021, 1, 4, 14, 4, 57), PULocationID=201, DOLocationID=180),\n",
       " Row(pickup_datetime=datetime.datetime(2021, 1, 3, 18, 42, 3), dropoff_datetime=datetime.datetime(2021, 1, 3, 19, 12, 22), PULocationID=132, DOLocationID=72)]"
      ]
     },
     "execution_count": 55,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \\\n",
    "  .filter(df.hvfhs_license_num == 'HV0003')\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "id": "0866f9c0",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag\r\n",
      "\r\n",
      "HV0003,B02682,2021-01-01 00:33:44,2021-01-01 00:49:07,230,166,\r\n",
      "\r\n",
      "HV0003,B02682,2021-01-01 00:55:19,2021-01-01 01:18:21,152,167,\r\n",
      "\r\n",
      "HV0003,B02764,2021-01-01 00:23:56,2021-01-01 00:38:05,233,142,\r\n",
      "\r\n",
      "HV0003,B02764,2021-01-01 00:42:51,2021-01-01 00:45:50,142,143,\r\n",
      "\r\n",
      "HV0003,B02764,2021-01-01 00:48:14,2021-01-01 01:08:42,143,78,\r\n",
      "\r\n",
      "HV0005,B02510,2021-01-01 00:06:59,2021-01-01 00:43:01,88,42,\r\n",
      "\r\n",
      "HV0005,B02510,2021-01-01 00:50:00,2021-01-01 01:04:57,42,151,\r\n",
      "\r\n",
      "HV0003,B02764,2021-01-01 00:14:30,2021-01-01 00:50:27,71,226,\r\n",
      "\r\n",
      "HV0003,B02875,2021-01-01 00:22:54,2021-01-01 00:30:20,112,255,\r\n",
      "\r\n"
     ]
    }
   ],
   "source": [
    "!head -n 10 head.csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "aa1b0e18",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
